Full Resumability PR 3: Filtering the completed tasks#1997
Draft
oyilmaz-nvidia wants to merge 16 commits into
Draft
Full Resumability PR 3: Filtering the completed tasks#1997oyilmaz-nvidia wants to merge 16 commits into
oyilmaz-nvidia wants to merge 16 commits into
Conversation
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
PR Summary
Title
Skip already-completed tasks on pipeline re-run
Summary
Builds on the existing lineage checkpoint (DAG recording + BFS completion marking) to make pipelines resumable: when a stage's input task is already marked
completedin the LMDB store, skip it.Changes
nemo_curator/utils/lineage_store.py— adds bulkare_completed(udids)at three layers:LineageStore(single read txn, snapshot-consistent),LineageWriterActor(proxy), and a module-level helper that mirrors the no-op gating ofrecord_lineage/mark_leaves_completed(all-False when Ray isn't initialized or no actor is registered).nemo_curator/stages/base.py— newProcessingStage._filter_completed_tasks(tasks)method, called as the first step of the defaultprocess_batchso completed tasks never reachvalidate_inputorprocess. Empty_udid(source/unassigned tasks) is never filtered. Order is preserved for survivors.Contract for stage authors
Stages that override
process_batch(typically fan-in / fan-out variants) must callself._filter_completed_tasks(tasks)at the top of the override — same contract as the existingrecord_lineage/mark_leaves_completedhelpers. The inline guidance comment inbase.pydocuments this.Tests
tests/utils/test_lineage_store.py— 10 new tests for bulkare_completed: empty input, all/none/mixed completed, unknown udids, empty-string udids, and module-level helper variants (no Ray, no actor, with actor).tests/stages/common/test_base.py— 6 new tests inTestFilterCompletedTaskscovering filter behavior andprocess_batchintegration (completed tasks never reachprocess; all-completed batch returns[]cleanly).tests/pipelines/test_lineage_integration.py+tests/pipelines/_resumability_runner.py— new end-to-end SIGINT/resume test. A subprocess runs a 4-stage 2000-task pipeline (fan-out → passthrough → chunked fan-in → slow writer) writing to a checkpoint path. The test sendsSIGINTafter 5 leaves complete, asserts partial completion in LMDB, then relaunches the runner against the same checkpoint and asserts all 4200 DAG nodes end upcompleted.Test plan
pytest tests/utils/test_lineage_store.py -k are_completedpytest tests/stages/common/test_base.py::TestFilterCompletedTaskspytest tests/pipelines/test_lineage_integration.py::test_resumable_after_sigint(~60s)ruff check nemo_curator/ tests/checkpoint_pathisn't supplied — filter is a silent no-op).Design notes
are_completeduses a single LMDB read transaction and a singleray.getper stage batch, so per-task remote-call overhead doesn't scale with batch size.sync=Truedurability: every committedrecord_lineage/mark_leaves_completedcall survives a hard interrupt, while in-flight uncommitted writes are lost — exactly the property the filter needs to handle on resume.